πŸ•ΈοΈ Ada Research Browser

bt-08-alerting.md
← Back

BT-08: Alert Engine

Goal: Implement multi-channel alerting with severity-based routing, audit failure detection, and alert persistence.

Files: - Create: /opt/security-blue-team/blueteam/alerting/engine.py - Create: /opt/security-blue-team/blueteam/alerting/channels.py - Create: /opt/security-blue-team/tests/test_alerting.py

Depends on: BT-07


Step 1: Write tests

# tests/test_alerting.py
from blueteam.alerting.engine import AlertEngine
from blueteam.models import SecurityIncident

def test_alert_engine_routes_by_severity():
    engine = AlertEngine(config={"alerting": {"syslog": {"enabled": True}}})
    incident = SecurityIncident(
        title="Test incident",
        severity="critical",
        detected_by="test",
        nist_controls=["3.3.4"],
    )
    # Should not raise
    engine.alert(incident)

def test_alert_persisted_to_db():
    # Requires DB β€” integration test
    pass

Step 2: Implement alert channels

# blueteam/alerting/channels.py
"""Alert delivery channels."""
import syslog
import smtplib
from email.mime.text import MIMEText
from abc import ABC, abstractmethod
from blueteam.models import SecurityIncident

class AlertChannel(ABC):
    @abstractmethod
    def send(self, incident: SecurityIncident) -> bool: ...

class SyslogChannel(AlertChannel):
    SEVERITY_MAP = {
        "critical": syslog.LOG_CRIT,
        "high": syslog.LOG_ERR,
        "medium": syslog.LOG_WARNING,
        "low": syslog.LOG_NOTICE,
        "info": syslog.LOG_INFO,
    }

    def send(self, incident: SecurityIncident) -> bool:
        priority = self.SEVERITY_MAP.get(incident.severity, syslog.LOG_WARNING)
        syslog.openlog("eqmon-blueteam", syslog.LOG_PID, syslog.LOG_LOCAL6)
        syslog.syslog(priority,
            f"SECURITY_INCIDENT [{incident.severity.upper()}] "
            f"{incident.title} | detected_by={incident.detected_by} "
            f"nist={','.join(incident.nist_controls)} "
            f"cui={incident.cui_involved}")
        syslog.closelog()
        return True

class EmailChannel(AlertChannel):
    def __init__(self, config: dict):
        self.smtp_host = config.get("smtp_host", "localhost")
        self.smtp_port = config.get("smtp_port", 587)
        self.from_addr = config.get("from_address", "blueteam@eqmon.local")
        self.recipients = config.get("recipients", [])

    def send(self, incident: SecurityIncident) -> bool:
        if not self.recipients:
            return False
        subject = f"[{incident.severity.upper()}] {incident.title}"
        body = (
            f"Security Incident Detected\n"
            f"{'=' * 40}\n"
            f"Title: {incident.title}\n"
            f"Severity: {incident.severity}\n"
            f"Detected by: {incident.detected_by}\n"
            f"NIST Controls: {', '.join(incident.nist_controls)}\n"
            f"CUI Involved: {incident.cui_involved}\n\n"
            f"Description:\n{incident.description}\n"
        )
        msg = MIMEText(body)
        msg["Subject"] = subject
        msg["From"] = self.from_addr
        msg["To"] = ", ".join(self.recipients)
        try:
            with smtplib.SMTP(self.smtp_host, self.smtp_port) as smtp:
                smtp.send_message(msg)
            return True
        except Exception:
            return False

class ConsoleChannel(AlertChannel):
    def send(self, incident: SecurityIncident) -> bool:
        from rich.console import Console
        from rich.panel import Panel
        console = Console(stderr=True)
        style = {"critical": "bold red", "high": "red", "medium": "yellow", "low": "blue"}.get(incident.severity, "white")
        console.print(Panel(
            f"[bold]{incident.title}[/bold]\n"
            f"Severity: {incident.severity} | Rule: {incident.detected_by}\n"
            f"NIST: {', '.join(incident.nist_controls)}\n"
            f"{incident.description}",
            title=f"SECURITY ALERT",
            style=style,
        ))
        return True

Step 3: Implement alert engine with severity routing

# blueteam/alerting/engine.py
"""Alert routing engine with severity-based channel selection."""
from blueteam.models import SecurityIncident
from blueteam.alerting.channels import SyslogChannel, EmailChannel, ConsoleChannel
from blueteam import db as db_module

# Severity β†’ channels
ROUTING = {
    "critical": ["email", "syslog", "console"],
    "high":     ["email", "syslog"],
    "medium":   ["syslog"],
    "low":      ["syslog"],
    "info":     [],
}

class AlertEngine:
    def __init__(self, config: dict):
        self.config = config
        self.channels = {}
        alert_cfg = config.get("alerting", {})
        if alert_cfg.get("syslog", {}).get("enabled"):
            self.channels["syslog"] = SyslogChannel()
        if alert_cfg.get("email", {}).get("enabled"):
            self.channels["email"] = EmailChannel(alert_cfg["email"])
        self.channels["console"] = ConsoleChannel()

    def alert(self, incident: SecurityIncident):
        """Route alert to appropriate channels based on severity."""
        channels = ROUTING.get(incident.severity, [])
        for ch_name in channels:
            if ch_name in self.channels:
                try:
                    self.channels[ch_name].send(incident)
                except Exception as e:
                    import sys
                    print(f"Alert channel {ch_name} failed: {e}", file=sys.stderr)
        self._persist(incident)

    def _persist(self, incident: SecurityIncident):
        """Save alert to blueteam.alert_history."""
        try:
            conn = db_module.get_connection(self.config)
            with conn.cursor() as cur:
                cur.execute("""
                    INSERT INTO blueteam.alert_history
                        (rule_id, severity, title, description)
                    VALUES (%s, %s, %s, %s)
                """, (incident.detected_by, incident.severity,
                      incident.title, incident.description))
        except Exception:
            pass  # Don't crash monitoring over DB write failure

Step 4: Run tests, commit

python -m pytest tests/test_alerting.py -v
git add -A
git commit -m "feat: alert engine with syslog, email, console channels (NIST 3.3.4)"